package delayqueue

import (
	"context"
	"time"

	"github.com/go-redis/redis/v8"
)

// NewQueue creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message
//
//	 queue := delayqueue.NewQueue("example", redisCli, func(payload string) bool {
//	     // callback returns true to confirm successful consumption.
//	     // If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
//			return true
//		})

func wrapErr(err error) error {
	if err == redis.Nil {
		return NilErr
	}
	return err
}

type RedisV8Client interface {
	redis.Cmdable
	Subscriber
}

type Subscriber interface {
	Subscribe(ctx context.Context, channels ...string) *redis.PubSub
	Publish(ctx context.Context, channel string, message interface{}) *redis.IntCmd
}

type RedisV8Wrapper struct {
	RedisV8Cmder
	cli RedisV8Client
}

func NewRedisV8Wrapper(cli RedisV8Client) *RedisV8Wrapper {
	return &RedisV8Wrapper{
		RedisV8Cmder: RedisV8Cmder{cli},
		cli:          cli,
	}
}

type RedisV8Cmder struct {
	redis.Cmdable
}

func (r *RedisV8Cmder) Eval(script string, keys []string, args []interface{}) (interface{}, error) {
	ctx := context.Background()
	ret, err := r.Cmdable.Eval(ctx, script, keys, args...).Result()
	return ret, wrapErr(err)
}

func (r *RedisV8Cmder) Set(key string, value string, expiration time.Duration) error {
	ctx := context.Background()
	return wrapErr(r.Cmdable.Set(ctx, key, value, expiration).Err())
}

func (r *RedisV8Cmder) Get(key string) (string, error) {
	ctx := context.Background()
	ret, err := r.Cmdable.Get(ctx, key).Result()
	return ret, wrapErr(err)
}

func (r *RedisV8Cmder) Del(keys []string) error {
	ctx := context.Background()
	return wrapErr(r.Cmdable.Del(ctx, keys...).Err())
}

func (r *RedisV8Cmder) HSet(key string, values ...interface{}) error {
	ctx := context.Background()
	return wrapErr(r.Cmdable.HSet(ctx, key, values...).Err())
}

func (r *RedisV8Cmder) HDel(key string, fields []string) error {
	ctx := context.Background()
	return wrapErr(r.Cmdable.HDel(ctx, key, fields...).Err())
}

func (r *RedisV8Cmder) SMembers(key string) ([]string, error) {
	ctx := context.Background()
	ret, err := r.Cmdable.SMembers(ctx, key).Result()
	return ret, wrapErr(err)
}

func (r *RedisV8Cmder) SRem(key string, members []string) error {
	ctx := context.Background()
	members2 := make([]interface{}, len(members))
	for i, v := range members {
		members2[i] = v
	}
	return wrapErr(r.Cmdable.SRem(ctx, key, members2...).Err())
}

func (r *RedisV8Cmder) ZAdd(key string, values map[string]float64) error {
	ctx := context.Background()
	var zs []*redis.Z
	for member, score := range values {
		zs = append(zs, &redis.Z{
			Score:  score,
			Member: member,
		})
	}
	return wrapErr(r.Cmdable.ZAdd(ctx, key, zs...).Err())
}

func (r *RedisV8Cmder) ZRem(key string, members []string) error {
	ctx := context.Background()
	members2 := make([]interface{}, len(members))
	for i, v := range members {
		members2[i] = v
	}
	return wrapErr(r.Cmdable.ZRem(ctx, key, members2...).Err())
}

func (r *RedisV8Cmder) ZCard(key string) (int64, error) {
	ctx := context.Background()
	return r.Cmdable.ZCard(ctx, key).Result()
}

func (r *RedisV8Cmder) LLen(key string) (int64, error) {
	ctx := context.Background()
	return r.Cmdable.LLen(ctx, key).Result()
}

func (r *RedisV8Wrapper) Pipelined(fn func(Cmder) error) error {
	ctx := context.Background()
	pipe := r.Cmdable.Pipeline()
	defer pipe.Close()
	fn(&RedisV8Cmder{pipe})
	_, err := pipe.Exec(ctx)
	return err
}

func (r *RedisV8Wrapper) Publish(channel string, payload string) error {
	ctx := context.Background()
	return r.cli.Publish(ctx, channel, payload).Err()
}

func (r *RedisV8Wrapper) Subscribe(channel string) (<-chan string, func(), error) {
	ctx := context.Background()
	sub := r.cli.Subscribe(ctx, channel)
	close := func() {
		_ = sub.Close()
	}
	resultChan := make(chan string) // sub.Channel() has its own buffer
	go func() {
		for msg := range sub.Channel() {
			resultChan <- msg.Payload
		}
	}()

	return resultChan, close, nil
}
